package local

import (
	"container/list"
	"context"
	"errors"
	"gitee.com/gitee-go/core/common"
	comm2 "gitee.com/gitee-go/server/engine/comm"
	"gitee.com/gitee-go/server/engine/dao"
	runtimes "runtime"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"gitee.com/gitee-go/core"
	"gitee.com/gitee-go/core/runtime"
	"gitee.com/gitee-go/server/comm"
	"gitee.com/gitee-go/utils/ioex"
)

/**
流水线调度器
主要管理流水线运行队列,运行状态(taskItem)
*/
type BuildEngine struct {
	cliegn *ClientEngine
	ctx    context.Context
	tasks  ioex.SyncMap
	tsklk  sync.Mutex
	taskw  *list.List

	joblk sync.Mutex
	jobs  ioex.SyncMap
	jobki *list.List

	lntm time.Time
}

func StartBuildEngine(ctx context.Context, cliegn *ClientEngine) *BuildEngine {
	c := &BuildEngine{
		cliegn: cliegn,
		ctx:    ctx,
		taskw:  list.New(),
		jobki:  list.New(),
	}
	go func() {
		c.init()
		for !ioex.CheckContext(c.ctx) {
			c.run()
			time.Sleep(time.Millisecond * 10)
		}
	}()
	return c
}

func (c *BuildEngine) init() {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("local.BuildEngine init:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()

	// TODO: 调试不执行
	if comm.Debugs {
		return
	}
	// 每次重启,需要把所有的任务改成取消状态,因为进程可能会被用户kill,导致状态还是运行中的情况
	cont := "server restart"
	comm.DBMain.GetDB().Exec(
		"update `t_build` set `status`=?,`error`=? where `status`!=? and `status`!=? and `status`!=?",
		common.BUILD_STATUS_CANCEL, cont, common.BUILD_STATUS_OK, common.BUILD_STATUS_ERROR, common.BUILD_STATUS_CANCEL,
	)
	comm.DBMain.GetDB().Exec(
		"update `t_stage` set `status`=?,`error`=? where `status`!=? and `status`!=? and `status`!=?",
		common.BUILD_STATUS_CANCEL, cont, common.BUILD_STATUS_OK, common.BUILD_STATUS_ERROR, common.BUILD_STATUS_CANCEL,
	)
	comm.DBMain.GetDB().Exec(
		"update `t_job` set `status`=?,`error`=? where `status`!=? and `status`!=? and `status`!=?",
		common.BUILD_STATUS_CANCEL, cont, common.BUILD_STATUS_OK, common.BUILD_STATUS_ERROR, common.BUILD_STATUS_CANCEL,
	)
}
func (c *BuildEngine) run() {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("local.BuildEngine runTask:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()
	// tasks是每个流水线build,jobs是流水线内step任务的集合,不一定属于同一个build
	// 检测任务完成,从队列删除,防止内存占用
	c.tasks.Range(func(key string, value interface{}) bool {
		v := value.(*taskItem)
		if v.end {
			c.tasks.Delun(key)
			return false
		}
		return true
	})
	// 检测任务完成,从队列删除,防止内存占用
	c.jobs.Range(func(key string, value interface{}) bool {
		v := value.(*comm2.SyncJob)
		v.Lock()
		end := v.End
		v.Unlock()
		if end {
			c.jobs.Delun(key)
			return false
		}
		return true
	})

	// 把流水线build等待队列取出,如果运行中没超过限制,则运行build并加入tasks运行中队列
	c.tsklk.Lock()
	e := c.taskw.Front()
	if e != nil && (comm.BuildLimit == 0 || c.tasks.Len() < comm.BuildLimit) {
		v := e.Value.(*taskItem)
		v.start()                  //
		c.tasks.Put(v.build.Id, v) // 加入运行中队列
		c.taskw.Remove(e)
	}
	c.tsklk.Unlock()

	if time.Since(c.lntm).Seconds() > 5 {
		c.lntm = time.Now()
		var m runtimes.MemStats
		runtimes.ReadMemStats(&m)
		core.Log.Debugf("tasks ln(%d/%d),jobs ln(%d)", c.tasks.Len(), comm.BuildLimit, c.jobs.Len())
		//core.Log.Infof("memory heap-allc:%dK,heap-sys:%dK", m.HeapAlloc/1024,m.HeapSys/1024)
	}
}

// Put 提交build任务到等待队列,等待任务执行
func (c *BuildEngine) Put(bld *runtime.Build) error {
	if bld == nil || bld.Id == "" {
		return errors.New("id is empty")
	}
	if bld.Cfg == nil {
		return errors.New("cfg is nil")
	}
	task := &taskItem{
		prt:   c,
		build: bld,
	}
	c.tsklk.Lock()
	defer c.tsklk.Unlock()
	//c.tasks.Store(bld.Id, task)
	c.taskw.PushBack(task)
	return nil
}

// PullJob 拉取任务需要执行的任务,一般来自runner的请求
func (c *BuildEngine) PullJob(id string) *comm2.SyncJob {
	//client id
	if id == "" {
		return nil
	}
	info, ok := c.cliegn.GetCliInfo(id)
	if !ok {
		return nil
	}
	c.joblk.Lock()
	defer c.joblk.Unlock()
	// 循环每一个job,如果基础插件匹配,则返回job,如果不匹配,不返回任务,继续等待
	for fe := c.jobki.Front(); fe != nil; fe = fe.Next() {
		jid := fe.Value.(string)
		job, ok := c.jobs.Get(jid)
		if !ok {
			continue
		}
		rt := job.(*comm2.SyncJob)
		plug := strings.TrimSpace(rt.Job().Job)
		if rt.Job().Plugined {
			plug = "shell@sh"
		}
		if plug != "" {
			ok = false
			for _, v := range info.Plugin {
				if v == plug {
					ok = true
					break
				}
			}
			//core.Log.Debugf("PullJob pluged:%t,plug:%s,%t,plugs(%s):%v", rt.Job().Plugined, plug, ok, info.Id, info.Plugin)
			if !ok {
				continue
			}
		}
		//task.agt = a
		//c.jobkd.PushBack(jid)
		c.jobki.Remove(fe)
		core.Log.Debugf("BuildEngine client(%s) PullJob:%s", id, rt.Job().Id)
		return rt
	}
	return nil
}

// 把job插入jobs队列 工作在build engine
func (c *BuildEngine) putJob(job *comm2.SyncJob) error {
	if job == nil || job.Job() == nil || job.Job().Id == "" {
		return errors.New("param err")
	}
	core.Log.Debug("BuildEngine putJob start:", job.Job().Id)
	c.joblk.Lock()
	defer c.joblk.Unlock()
	c.jobki.PushBack(job.Job().Id)
	c.jobs.Put(job.Job().Id, job)
	// core.Log.Debug("BuildEngine putJob end:iln(%d),sln(%d)", c.jobki.Len())
	return nil
}

// 更新job状态,运行中,完成,错误,取消
func (c *BuildEngine) JobUpdate(req *runtime.ReqJobUpdate) error {
	if req == nil || req.Jobid == "" || req.Status == "" {
		return errors.New("param err")
	}
	c.joblk.Lock()
	defer c.joblk.Unlock()
	e, ok := c.jobs.Get(req.Jobid)
	if !ok {
		return comm2.BuildErrNoJob
	}
	job := e.(*comm2.SyncJob)
	job.Lock()
	job.Job().Status = req.Status
	job.Job().Error = req.Error
	job.Job().ExitCode = req.ExitCode
	job.Unlock()
	if err := dao.UpdateJob(job.Job()); err != nil {
		core.Log.Errorf("dao.updateJob err:%v", err)
	}
	return nil
}

// 停止构建
func (c *BuildEngine) StopTask(bdid string) bool {
	bd, ok := c.tasks.Get(bdid)
	if !ok {
		return false
	}
	task := bd.(*taskItem)
	task.ctrlend = true
	task.ctrlendtm = time.Now()
	return true
}

// 获取build状态
func (c *BuildEngine) GetTaskStat(bdid string) (*runtime.BuildTaskStat, bool) {
	bd, ok := c.tasks.Get(bdid)
	if !ok {
		return nil, false
	}
	task := bd.(*taskItem)
	return &runtime.BuildTaskStat{
		End:     task.end,
		CtrlEnd: task.ctrlend,
	}, true
}

// 获取运行中的Job
func (c *BuildEngine) GetSyncJob(bdid, snm, jnm string) (*comm2.SyncJob, bool) {
	if bdid == "" || snm == "" || jnm == "" {
		return nil, false
	}
	bd, ok := c.tasks.Get(bdid)
	if !ok {
		return nil, false
	}
	task := bd.(*taskItem)
	task.staglk.Lock()
	defer task.staglk.Unlock()
	stg, ok := task.stages[snm]
	if !ok {
		return nil, false
	}
	job, ok := stg.jobs[jnm]
	return job, ok
}

// 通过名称查找job
func (c *BuildEngine) FindSyncJob(bdid, sid, jnm string) (*comm2.SyncJob, bool) {
	bd, ok := c.tasks.Get(bdid)
	if !ok {
		return nil, false
	}
	task := bd.(*taskItem)
	task.staglk.Lock()
	defer task.staglk.Unlock()
	for _, v := range task.stages {
		if v.stage.Id == sid {
			job, ok := v.jobs[jnm]
			return job, ok
		}
	}

	return nil, false
}
